package org.snake.nebulae.core.task;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.snake.nebulae.core.common.MqttFactory;
import org.snake.nebulae.core.model.ServiceInfo;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

@Component
@Slf4j
public class CoreTask implements Runnable {

    public static String topicPrefix = "nebulae";

    // $share/default/
    // todo 只要sdk适配到5.0 那么就直接可以适配
    public static String topicPrefixShare = "";

    // 单个服务系列后缀
    public static String topicRequestExt = "/request";

    public static String topicResponseExt = "/response";

    public static String topicStatusExt = "/status";

    public static String topicMonitorExt = "/monitor";

    /**
     * 服务注册主题
     */
    public static String topicServices = topicPrefix + "/service";

    /**
     * 服务列表拉取主题
     */
    public static String topicServicesPull = topicServices + topicMonitorExt;

    /**
     * 处理器列表
     */
    public static String topicHandles = topicPrefix + "/handles";

    /**
     * 服务列表
     * 服务 -> [方法 -> [方法实例1,方法实例2 ...], 方法 -> [方法实例1,方法实例2...]]
     */
    private ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>>> services = new ConcurrentHashMap<>();

    /**
     * 注册完第一次之后执行服务插件位置
     */
    protected CompletableFuture<CoreTask> doRegisteredPlugins = new CompletableFuture<>();

    @Resource
    private MqttFactory mqttFactory;

    private MqttAsyncClient mqttAsyncClient;

    @Resource
    private ObjectMapper objectMapper;

    private CoreTask self;

    @PostConstruct
    private void init() throws MqttException {
        mqttAsyncClient = mqttFactory.getCustomMqttAsyncClient();
        self = this;
    }

    // coreTask的作用在于
    @Override
    public void run() {
        try {
            // 监听服务注册主题
            IMqttActionListener watchRegisterAction = new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    log.info("订阅核心主题 {} 完成", topicServices);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.error("订阅核心主题 {} 出错 {} 消息id: {}", topicServices, exception.getMessage(), asyncActionToken.getMessageId());
                }
            };

            IMqttMessageListener registerListener = (topic, message) -> {
                // 当有服务被注册时 解析该服务并且记录下来
                String msg = new String(message.getPayload());
                ServiceInfo serviceInfo = objectMapper.readValue(msg, ServiceInfo.class);

                doRegister(serviceInfo);
                doRegisteredPlugins.complete(self);

                log.info("核心主题收到消息 {} {}", topic, msg);

                // 更新保留消息
                String servicesJson = objectMapper.writeValueAsString(services);
                String topicFix = topicServices + topicMonitorExt;
                mqttAsyncClient.publish(topicFix, servicesJson.getBytes(), 2, true, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        log.info("保留消息发送成功");
                    }

                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        log.info("保留消息发送失败: {}", exception.getMessage());
                    }
                });
            };

            mqttAsyncClient.subscribe(topicServices, 2, this, watchRegisterAction, registerListener);

            // handle 类似扩展钩子
            mqttAsyncClient.subscribe(topicHandles, 2, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    log.info("订阅核心主题 {} 完成", topicHandles);
                }

                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    log.error("订阅核心主题 {} 出错 {} 消息id: {}", topicHandles, exception.getMessage(), asyncActionToken.getMessageId());
                }
            }, (topic, message) -> {
                log.info("核心主题收到消息 {} {}", topic, new String(message.getPayload()));
            });
        } catch (MqttException e) {
            log.error("订阅核心主题出错： {}", e.getMessage());
        }
    }

    /**
     * 注册服务逻辑
     *
     * @param serviceInfo 服务信息
     */
    protected void doRegister(ServiceInfo serviceInfo) {
        // todo 可能需要提取出几个hook队列 方便做插件或者插入业务逻辑
        // 判断服务列表中是否存在相关服务模块名称
        if (services.containsKey(serviceInfo.getName())) {
            // 说明之前已经注册过相关
            // 之前的方法列表
            ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>> lastMethodList = services.get(serviceInfo.getName());

            serviceInfo.getMethods().forEach(method -> {
                if (lastMethodList.containsKey(method)) {
                    // 之前有过相关方法
                    ConcurrentLinkedQueue<ServiceInfo> serviceInfos = lastMethodList.get(method);
                    serviceInfos.removeIf(sinfo -> sinfo.getServiceId().equals(serviceInfo.getServiceId()));
                    serviceInfos.add(serviceInfo);
                } else {
                    // 之前没有过相关方法
                    ConcurrentLinkedQueue<ServiceInfo> methodList = new ConcurrentLinkedQueue<>();
                    methodList.add(serviceInfo);

                    lastMethodList.put(method, methodList);
                }
            });
        } else {
            // 刚刚注册
            ConcurrentLinkedQueue<ServiceInfo> serviceList = new ConcurrentLinkedQueue<>();
            serviceList.add(serviceInfo);

            ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>> methodList = new ConcurrentHashMap<>();
            // 一个服务可能包含多个方法
            serviceInfo.getMethods().forEach(method -> methodList.put(method, serviceList));

            services.put(serviceInfo.getName(), methodList);
        }
    }

    public ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentLinkedQueue<ServiceInfo>>> getServices() {
        return services;
    }

    public CompletableFuture<CoreTask> getDoRegisteredPlugins() {
        return doRegisteredPlugins;
    }
}
